RocketMQ 关于网络方面核心类图如下所示:
接下来先一一介绍各个类的主要职责。
RPC 远程服务基础类。主要定义所有的远程服务类的基础方法:
void start()
:启动远程服务。void shutdown()
:关闭。void registerRPCHook(RPCHook rpcHook)
:注册 RPC
钩子函数,有利于在执行网络操作的前后执行定制化逻辑。
远程服务器/客户端基础接口,两者中的方法基本类似,故这里重点介绍一下 RemotingServer,定位 RPC 远程操作的相关“业务方法”。
void registerProcessor(int requestCode, NettyRequestProcessor processor,ExecutorService executor)
注册命令处理器,这里是 RocketMQ Netty 网络设计的核心亮点,RocketMQ 会按照业务逻辑进行拆分,例如消息发送、消息拉取等每一个网络操作会定义一个请求编码(requestCode),然后每一个类型对应一个业务处理器 NettyRequestProcessor,并可以按照不同的 requestCode 定义不同的线程池,实现不同请求的线程池隔离。其参数说明如下。
int requestCode
命令编码,rocketmq 中所有的请求命令在 RequestCode 中定义。
NettyRequestProcessor processor
RocketMQ 请求业务处理器,例如消息发送的处理器为 SendMessageProcessor,PullMessageProcessor 为消息拉取的业务处理器。
ExecutorService executor
线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行。
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode)
根据请求编码获取对应的请求业务处理器与线程池。
RemotingCommand invokeSync(Channel channel, RemotingCommand request,long timeoutMillis)
同步请求调用,参数说如下:
void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
异步请求调用。
void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis)
Oneway 请求调用。
Netty 远程服务抽象实现类,定义网络远程调用、请求,响应等处理逻辑,其核心方法与核心方法的设计理念如下。
NettyRemotingAbstract 核心属性:
Semaphore semaphoreOneway:控制 oneway 发送方式的并发度的信号量,默认为 65535 个许可。
Semaphore semaphoreAsync:控制异步发送方式的并发度的信号量,默认为 65535 个许可。
ConcurrentMap<Integer /* opaque */, ResponseFuture>
responseTable
:当前正在等待对端返回的请求处理表,其中 opaque
表示请求的编号,全局唯一,通常采用原子递增,通常套路是客户端向对端发送网络请求时,通常会采取单一长连接,故发送请求后会向调用端立即返回
ResponseFuture,同时会将请求放入到该映射表中,然后收到客户端响应时(客户端响应会包含请求
code),然后从该映射表中获取对应的
ResponseFutre,然后通知调用端的返回结果,这里是Future
模式在网络编程中的经典运用。
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
:注册的请求处理命令。RocketMQ 的设计中采用了不同请求命令支持不同的线程池,即实现业务线程池的隔离。
Pair<NettyRequestProcessor, ExecutorService>
defaultRequestProcessor
:默认命令处理线程池。
List<RPCHook> rpcHooks
:注册的 RPC 钩子函数列表。基于 Netty 网络编程客户端,实现 RemotingClient 接口并继承 NettyRemotingAbstract。
其核心属性说明如下:
ConcurrentMap<String /* addr */, ChannelWrapper> channelTables
:当前客户端已创建的连接(网络通道、Netty
Cannel),每一个地址一条长连接。
基于 Netty 网络编程服务端。
其核心属性如下所示:
Timer timer = new Timer("ServerHouseKeepingService", true)
:定时扫描器,对
NettyRemotingAbstract 中的 responseTable 进行扫描,将超时的请求移除。
基于 Netty 实现的请求命令处理器,即在服务端各个业务处理逻辑,例如处理消息发送的 SendMessageProcessor。
关于 NettyRemotingServer、NettyRemotingClient 将在下文继续深入探讨,通过类图的方式对了解 Netty 网络设计的精髓还不太直观,下面再给出一张流图,进一步阐释 RocketMQ 网络设计的精髓。
其核心关键点说明如下:
上述流程图将省略 NettyRemotingClient、NettyRemotingServer 的初始化流程,因为这些将在下文详细阐述。
对网络编程基本的流程掌握后,我们接下来学习 NettyRemotingServer、NettyRemotingClient 的具体实现代码,来掌握 Netty 服务端、客户端的编写技巧。
基于网络编程模型,通常需要解决的问题:
基于网络的编程,其实是面向二进制流,我们以大家最熟悉的的 Dubbo RPC 访问请求为例进行更直观的讲解,Dubbo 的通讯过程如下所示:
例如一个订单服务 order-serevice-app,用户会发起多个下单服务,在 order-service-app 中就会对应多个线程,订单服务需要调用优惠券相关的微服务,多个线程通过 dubbo client 向优惠券发起 RPC 调用,这个过程至少需要做哪些操作呢?
接下来我们将从 RocketMQ 中是如何使用的,从而来探究 Netty 的学习与使用。
1. 客户端创建示例与要点
在 RocketMQ 中客户端的实现类:NettyRemotingClient。其创建核心代码被封装在 start 方法中,其代码截图如下图所示:
上述代码基本就是使用 Netty 编程创建客户端的标准模板,其关键点说明如下。
创建 DefaultEventExecutorGroup,默认事件执行线程组,后续事件处理器即(ChannelPipeline 中 addLast 中事件处理器)在该线程组中执行,故其本质就是一个线程池。
通过 Netty 提供的工具类 Bootstrap 来创建 Netty 客户端,其 group 方法指定一个事件循环组(EventLoopGroup),即 Work 线程组,主要是封装事件选择器(java.nio.Selector),默认情况下读写事件在该线程组中执行,俗称 IO 线程,但可以改变默认行为,下面会对这个加以详细解释;同时通过 chanel 方法指定通道的类型,基于 NIO 的客户端,通常使用 NioSocketChannel。
通过 Bootstrap 的 option 设置网络通信相关的参数,通常情况下会指定如下参数:
通过 Bootstrap 的 hanle 方法构建事件处理链条,通常通过使用 new ChannelInitializer<SocketChannel>()
。
通过 ChannelPipeline 的 addLast 方法构建事件处理链,这里是基于 Netty 的核心扩展点,应用程序的业务逻辑就是通过该事件处理器进行切入的。RocketMQ 中事件处理链说明如下:
即基于 Netty 的编程,主要包括制定通信协议(编码解码)、业务处理。下文会一一介绍。
ChannelPipeline 的 addLast 方法重点介绍:
如果调用在添加事件处理器时没有传入 EventExecutorGroup,那事件的执行默认在 Work 线程组,如果指定了,事件的执行将在传入的线程池中执行。
2. 创建连接及要点
上面的初始化并没有创建连接,在 RocketMQ 中在使用时才会创建连接,当然连接创建后就可以复用、缓存,即我们常说的长连接。基于 Netty 创建连接的示例代码如下:
这个基本上也是基于 Netty 的客户端创建连接的模板,其实现要点如下:
public boolean isOK() {
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
3. 请求发送示例
以同步消息发送为例我们来看一下消息发送的使用示例,其示例代码如下:
使用关键点如下:
ConcurrentMap<Integer
/* opaque */, ResponseFuture> responseTable
,当客户端收到服务端的响应后,需要根据
opaque 查找到对应的 ResponseFuture,从而唤醒客户端。
1. Netty 服务端创建示例
Step1:创建 Boss、Work 事件线程组。Netty 的服务端线程模型采用的是主从多 Reactor 模型,会创建两个线程组,分别为 Boss Group 与 Work Group,其创建示例如下图所示:
通常 Boos Group 默认使用一个线程,而 Work 线程组通常为 CPU 的合数,Work 线程组通常为 IO 线程池,处理读写事件。
Step2:创建默认事件执行线程组。
关于该线程池的作用与客户端类似,故不重复介绍。
Step3:使用 Netty ServerBootstrap 服务端启动类构建服务端。(模板)
通过 ServerBootstrap 构建的关键点如下:
Step4:调用 ServerBootstrap 的 bind 方法绑定到指定端口。
ServerBootstrap 的 bind 的方法是一个非阻塞方法,调用 sync() 方法会变成阻塞方法,即等待服务端启动完成。
2. Netty ServerHandler 编写示例
服务端在网络通信方面无非就是接受请求并处理,然后将响应发送到客户端,处理请求的入口通常通过定义 ChannelHandler,我们来看一下 RocketMQ 中编写的 Handler。
服务端的业务处理 Handler 主要是接受客户端的请求,故通常关注的是读事件,可以通常继承 SimpleChannelInboundHandler,并实现 channelRead0,由于已经经过了解码器(NettyDecoder),已经将请求解码成具体的请求对象了,在 RocketMQ 中使用 RemotingCommand 对象,只需面向该对象进行编程,processMessageReceived 该方法是 NettyRemotingClient、NettyRemotingServer 的父类,故对于服务端来会调用 processReqeustCommand 方法。
在基于 Netty4 的编程,在 ChannelHandler 加上@ChannelHandler.Sharable 可实现线程安全。
温馨提示:在 ChannelHandler 中通常不会执行具体的业务逻辑,通常是只负责请求的分发,其背后会引入线程池进行异步解耦,在 RocketMQ 的实现中更加如此,在 RocketMQ 提供了基于“业务”的线程池隔离,例如会为消息发送、消息拉取分别创建不同的线程池。这部分内容将在下文详细介绍。
基于网络编程,通信协议的制定是最最重要的工作,通常关于通信协议的设计套路如下:
通常采用的是 Header + Body 这种结构,通常 Header 部分是固定长度,并且在 Header 部分会有一个字段来标识整条消息的长度,至于头结点中是否会放置其他字段。这种结构非常经典,实现简单,特别适合在接收端从二进制流中解码请求,其关键点如下:
由于这种模式非常通用,故 Netty 提供了该解码的通用实现类:LengthFieldBasedFrameDecoder,即能够从二级制流中读取一个完整的消息自己缓存区,应用程序自己实现将 ByteBuf 转换为特定的请求对象即可,NettyDecoder 的示例如下:
而 NettyEncoder 的职责就是将请求对象转换成 ByteBuf,即转换成二级制流,这个对象转换为上图中协议格式(Header + Body)这种格式即可。
通常服务端接收请求,经过解码器解码后转换成请求对象,服务端需要根据请求对象进行对应的业务处理,避免业务处理阻塞 IO 读取线程,通常业务的处理会采用额外的线程池,即业务线程池,RocketMQ 在这块采用的方式值得我们借鉴,提供了不同业务采用不同的线程池,实现线程隔离机制。
RocketMQ 为每一个请求进行编码,然后每一类请求会对应一个 Proccess(业务处理逻辑),并且将 Process 注册到指定线程池,实现线程隔离机制。
Step1:首先在服务端启动时会先进行静态注册,将请求处理器与执行的线程池进行对应,其代码示例如下:
Step2:服务端接受到请求对象后,根据请求命令获取对应的 Processor 与线程池,然后将任务提交到线程池中执行,其代码示例如下所示(NettyRemotingAbstract#processRequestCommand)。
本篇就介绍到这里了,以 RocketMQ 中使用 Netty 编程为切入点,梳理出基于 Netty 进行网络编程的套路。